-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4460] Allow ProcessFunction on non-keyed streams #3438
Conversation
I looked over the changes and didn't find anything critical. The only thing that made me thinking was the boxed Tests, Scala API were done. I assume that we don't need to explicitly mention support for the process function on non-keyed streams. |
@rmetzger Yes, it's unfortunate that in our model not all elements always have a timestamp. The other alternative is throwing an exception when trying to access a non-existing timestamp. |
In addition to throwing an exception, we should also expose |
I think the discussion of timestamps and additional interfaces is orthogonal to this PR: |
* | ||
* @param <I> Type of the input elements. | ||
* @param <O> Type of the output elements. | ||
*/ | ||
@PublicEvolving | ||
public interface ProcessFunction<I, O> extends Function { | ||
public abstract class ProcessFunction<I, O> extends AbstractRichFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, changing form interface
to class
is incompatible on the user side. Can't ProcessFunction just extend RichFunction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is that we need a default implementation for onTimer(long, OnTimerContext, Collector)
(see below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @wenlong88 in the ML discussion (https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E) we decided to make ProcessFunction
available on non-keyed streams as well to allow using side outputs there. This requires making the onTimer()
method abstract, otherwise every user would always have to implement it. We marked ProcessFunction
as @PublicEvolcing
just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool change! I'm OK with the change from interface
to abstract class
. Do we need to update the documentation for any of the changes? If yes, I would make this part of this PR.
I had some inline comments that you can have a look at before merging. Other than that, +1 to merge this.
* @return The transformed {@link DataStream}. | ||
*/ | ||
@Internal | ||
public <R> SingleOutputStreamOperator<R> process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this internal method only exposed as public
for the Scala API? If yes, I'm wondering if it makes sense to call transform
manually in the Scala DataStream
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a TypeInformation
because we get the TypeInformation
from the context bound in the Scala API.
Calling transform()
manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in (Java)DataStream
while (Scala)DataStream.flatMap()
calls that method.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. 😄 So +1 to keep it as is. 👍 I was just wondering whether users would be confused by this.
* | ||
* <p><b>NOTE:</b> A {@code ProcessFunction} is always a | ||
* {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the | ||
* {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: as => is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing
* | ||
* @param <I> Type of the input elements. | ||
* @param <O> Type of the output elements. | ||
*/ | ||
@PublicEvolving | ||
public interface ProcessFunction<I, O> extends Function { | ||
public abstract class ProcessFunction<I, O> extends AbstractRichFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is that we need a default implementation for onTimer(long, OnTimerContext, Collector)
(see below).
* | ||
* @param <I> Type of the input elements. | ||
* @param <O> Type of the output elements. | ||
*/ | ||
@PublicEvolving | ||
public interface ProcessFunction<I, O> extends Function { | ||
public abstract class ProcessFunction<I, O> extends AbstractRichFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing serialVersionUID
@uce There is not issue for 2.0 to track this because I don't think there is consensus about always having timestamps. |
@uce There is some documentation that says that |
This is in preparation of allowing ProcessFunction on DataStream because we will use it to allow side outputs from the ProcessFunction Context.
Introduce new ProcessOperator for this. Rename the pre-existing ProcessOperator to KeyedProcessOperator.
…thod This is in preparation of allowing CoProcessFunction on a non-keyed connected stream. we will use it to allow side outputs from the ProcessFunction Context.
Introduce new CoProcessOperator for this. Rename the pre-existing CoProcessOperator to KeyedCoProcessOperator.
a26accf
to
746c1ef
Compare
Merged |
thanks for explaination, I have such concern because we have just suggested our users to use processFunction to implement their jobs, they need to change their code too when we sync the cimmit.after all, it is really nice to have timer in more scenarios. |
This is in preparation for side outputs, which will only work on
ProcessFunction
. We still want side outputs on non-keyed streams so we have to makeProcessFunction
available there.See this ML thread for reference: https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E